/**
 * Copyright @2019 Josin All Rights Reserved.
 * Author: Josin
 * Email : xeapplee@gmail.com
 * Hello EveryBody who use this project: mbinlogmq
 * This is the joke project, for that, i want to know more about the DB engine Data type
 * store mechanism, if you enjoy it, you can fullfill it's feature.
 * Thank you welcome every body like it.
 */

#include <mysql_binlog.h>
#include <fc_config.h>
#include <fc_string.h>
#include <mb_socket.h>
#include <binary_log_types.h>
#include <mysql_binlog.h>
#include <fc_xml.h>
#include <fc_xml_config.h>
#include <fc_log.h>
#include <mysql_binlog.h>
#include <cJSON.h>
#include <mysql_binlog.h>
#include <curl/curl.h>
#include <endian.h>
#include "mysql_binlog.h"

/**
 * @brief NOTICE
 * This is the MySQL's Query instance, a global var
 * Do not to close it, This instance will be 10m and then send an mysql keep-alive
 * ACK to get the right status.
 * rmqData used to be the RMQ push list, to ensure the message will delivery to
 * RMQ successfully
 */
GLOBAL_VARS MYSQL      *queryInstance = NULL;
GLOBAL_VARS FCL_LIST   *rmqData = NULL;
GLOBAL_VARS CXML_rmq   *cxmlRmq = NULL;
GLOBAL_VARS int         rmqFd;
GLOBAL_VARS
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
GLOBAL_VARS
pthread_cond_t  cond  = PTHREAD_COND_INITIALIZER;

/**
 * @brief NOTICE
 * Use this function to create the newly list node
 */
LIST_DATA *new_list_data()
{
    LIST_DATA *ptr=malloc( sizeof(LIST_DATA));
    if ( !ptr ) return NULL;
    e_memzero(ptr, sizeof(LIST_DATA));
    ptr->key = new_cstring();
    return ptr;
}

/**
 * @brief NOTICE
 * Used for the JSON node data to send to server
 */
RMQ_NODE *new_rmq_node(char *resp)
{
    RMQ_NODE *ptr = malloc( sizeof(RMQ_NODE));
    if ( !ptr ) return NULL;
    e_memzero(ptr, sizeof(RMQ_NODE));
    ptr->response = resp;
    ptr->status   = 0;
    return ptr;
}

/**
 * @brief NOTICE
 * Trash the RMQ node after used
 */
int trash_rmq_node(void *ptr)
{
    RMQ_NODE *data = ptr;
    if ( !data ) return FALSE;
    if ( data->response ){
        e_memfree(data->response);
    }
    e_memfree(data);
    return TRUE;
}

/**
 * @brief NOTICE
 * Signal
 */
void sig_callback(int signo, siginfo_t *siginfo, void *context)
{
    switch (signo)
    {
        case SIGUSR1:
/**
 * @brief NOTICE
 * Some thing can be done like reload the config file
 */
            break;
        case SIGUSR2:
/**
 * @brief NOTICE
 * When SIGUSR2 coming, exit the current process
 */
            exit(0);
        default:break;
    }
}

/**
 * @brief NOTICE
 * This function used to collect the RabbitMQ's response
 */
static size_t process_data(void *data, size_t size, size_t nmem, CSTRING *content)
{
    size_t az = size * nmem;
    new_cstring_add_string(content, (char *)data, az);
    logger(LOG_INFO, "RabbitMQ response:\n\t%s\n", data);
    return az;
}

/**
 * @brief NOTICE
 * This function will remove the RMQ packet from the wating list
 * if the status is ok, should keep the end of the RMQNODE even if
 * it has been finished.
 */
int rmq_data_remove_to_end()
{
    FCL_NODE    *fnode;
    FCL_NODE    *tnode;
    RMQ_NODE    *rmqNode;
    if ( rmqData && rmqData->num > 1 )
    {
        for ( fnode = FCL_LIST_HEAD_P(rmqData);
            fnode != NULL; fnode = tnode )
        {
            if ( FCL_NODE_NEXT_P(fnode) == NULL ) break;
            
            rmqNode = FCL_NODE_DATA_P(fnode);
            
            /* Next node */
            tnode = FCL_NODE_NEXT_P(fnode);
            
            if ( rmqNode->status == 0 ) continue;
            
            FCL_LIST_HEAD_P(rmqData) = tnode;
            FCL_NODE_PREV_P(tnode)   = NULL;
            trash_rmq_node(FCL_NODE_DATA_P(fnode));
            e_memfree(fnode);
        }
    }
    return TRUE;
}

/**
 * @brief NOTICE
 * This function is used for the purpose
 * to generate the RMQ message such as the following
 *  {"delivery_mode":1, "payload":", "payload_encoding":"string", "routing_key":"fanout", "properties":{}}
 */
char *rmq_make_message(char *payload)
{
    char         *res;
    struct cJSON *root, *item;
    
    root = cJSON_CreateObject();
    item = cJSON_CreateObject();
    
    cJSON_AddStringToObject(root, "delivery_mode", cxmlRmq->delivery_mode->sval->s);
    cJSON_AddStringToObject(root, "payload", payload);
    cJSON_AddStringToObject(root, "payload_encoding", "string");
    cJSON_AddStringToObject(root, "routing_key", cxmlRmq->routing_key->sval->s);
    cJSON_AddItemToObject(root, "properties", item);
    
    res = cJSON_PrintUnformatted(root);
    cJSON_Delete(root);
    return res;
}

/**
 * @brief NOTICE
 * Get the bit data
 */
long binlog_slice_bit(long long src_bits, int start, int size, int length)
{
    long binary, mask;
    binary = src_bits >> ( length - ( start + size ) );
    mask   = (( 1 << size ) - 1 );
    return binary & mask;
}

/**
 * @brief NOTICE
 * Used for the Child thread for the job
 * to publish the event to the RMQ.
 */
void *rmq_publish(void *ptr)
{
    FCL_NODE     *fnode;
    RMQ_NODE     *rmqNode;
    
    size_t        bsize;
    char         *postJosnData;
    char         *base64Encode;
    CSTRING      *result;
    CSTRING      *base64;
    struct cJSON *response;
    struct cJSON *rourted;
    
after_published:
    pthread_cond_wait(&cond, &mutex);
    
    if ( rmqData == NULL )
    {
        pthread_mutex_unlock(&mutex);
        goto after_published;
    }
/**
 * @brief NOTICE
 * Some RabbitMQ request body such as the following:
 *
 * POST / HTTP/1.1\r\n
 * Authorization: Basic xxxx\r\n
 * User-Agent: mbinlog/1.0\r\n
 * Content-Length: xxx\r\n
 * Content-Type: application/json;charset=UTF-8\r\n
 * \r\n
 * {"xx":"xx"}
 */
    for ( fnode = FCL_LIST_HEAD_P(rmqData); fnode != NULL; ) {
        
        rmqNode = FCL_NODE_DATA_P(fnode);
        
        
        if ( !rmqNode || rmqNode->status ) {
            fnode = FCL_NODE_NEXT_P(fnode);
            continue;
        }
        
/**
 * @brief NOTICE
 *  Use cURL to send the request to the RabbitMQ
 */
        CURL *curl = curl_easy_init();
        CSTRING *cstring = new_cstring();

        result = new_cstring();
        if ( cxmlRmq->host->sval->s[0] == 'h'
            || cxmlRmq->host->sval->s[0] == 'H' )
        {
            new_cstring_add_string( result, cxmlRmq->host->sval->s, ( long )cxmlRmq->host->sval->l );
        }
        else
        {
            new_cstring_add_string( result, E_STRL("http://"));
            new_cstring_add_string( result, cxmlRmq->host->sval->s, ( long )cxmlRmq->host->sval->l );
        }
        new_cstring_add_string( result, E_STRL(":"));
        new_cstring_add_string( result, cxmlRmq->port->sval->s, ( long )cxmlRmq->port->sval->l );
        new_cstring_add_string( result, E_STRL("/api/exchanges/"));

        if ( cxmlRmq->vhost->sval->l )
        {
            for ( int i = 0; i < cxmlRmq->vhost->sval->l; ++i )
            {
                if ( cxmlRmq->vhost->sval->s[i] == '/' )
                {
                    new_cstring_add_string(result, E_STRL("%2F"));
                } else {
                    new_cstring_add_char(result, cxmlRmq->vhost->sval->s[i]);
                }
            }
        }
        else
        {
            new_cstring_add_string(result, E_STRL("%2F"));
        }
        new_cstring_add_char(result, '/');
        if ( cxmlRmq->name->sval->l )
        {
            new_cstring_add_string( result, cxmlRmq->name->sval->s, ( long )cxmlRmq->name->sval->l );
        }
        else
        {
            new_cstring_add_string( result, E_STRL("amq.default"));
        }
        new_cstring_add_string(result, E_STRL("/publish"));

        /* http://127.0.0.1:15672/api/exchanges/%2F/amq.default/publish */
        curl_easy_setopt(curl, CURLOPT_URL, result->s);
        new_cstring_free(result);

        postJosnData = rmq_make_message(rmqNode->response);
        curl_easy_setopt(curl, CURLOPT_POSTFIELDS, postJosnData);

        curl_easy_setopt(curl, CURLOPT_POST, 1);
        curl_easy_setopt(curl, CURLOPT_WRITEDATA, cstring);
        curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &process_data);

        struct curl_slist *headers;

/**
 * @brief NOTICE
 * Add the Authorization
 */
        result = new_cstring();
        new_cstring_add_string(result, E_STRL("Authorization: Basic "));

        base64 = new_cstring();
        new_cstring_add_string( base64, cxmlRmq->user->sval->s, ( long )cxmlRmq->user->sval->l );
        new_cstring_add_char(   base64, ':');
        new_cstring_add_string( base64, cxmlRmq->password->sval->s, ( long )cxmlRmq->password->sval->l );

        bsize = base64_encode( ( const BYTE * )base64->s, NULL, base64->l, 1 );
        base64Encode  = malloc( sizeof(char) * bsize );
        base64_encode( ( const BYTE * )base64->s, ( BYTE * )base64Encode, base64->l, 1);
        new_cstring_add_string( result, base64Encode, bsize);
        free(base64Encode);

        headers = curl_slist_append(headers, result->s);
        new_cstring_free(result);

        headers = curl_slist_append(headers, "Content-Type: application/json;charset=UTF-8");
        headers = curl_slist_append(headers, "UserAgent: mbinlog/1.0");
        curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);

        if ( curl_easy_perform(curl) == CURLE_OK )
        {
            logger(LOG_NOTICE,
                "Send the message to the RabbitMQ successfully.\n\tmsg:{%s}\n", postJosnData);
        }

        long retcode = 0;
        CURLcode ccode = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &retcode);
        if ( ccode == CURLE_OK && retcode == 200 )
        {
            response = cJSON_Parse(cstring->s);
            rourted = cJSON_GetObjectItem(response, "routed");
            if ( rourted->type == cJSON_True ) {
                rmqNode->status = 1;
                logger(
                    LOG_INFO,
                    "RabbitMQ response the Request successfully\n\t{%s},"
                    "The next time will be deleted from Publish list,status:1\n",
                     postJosnData
                 );
            }
            cJSON_Delete(response);
        }
        new_cstring_free(cstring);
        curl_slist_free_all(headers);
        headers = NULL;
        curl_easy_cleanup(curl);
        e_memfree(postJosnData);
/**
 * @brief NOTICE
 *  Check wheather there is another node, if yes, remove the current node
 */
        if ( FCL_NODE_NEXT_P(fnode) != NULL )
        {
            FCL_NODE *tempNode = FCL_NODE_NEXT_P(fnode);
            trash_rmq_node(FCL_NODE_DATA_P(fnode));
            e_memfree(fnode);
            fnode = tempNode;
            FCL_LIST_HEAD_P(rmqData) = fnode;
        }
        else
        {
            fnode = FCL_NODE_NEXT_P(fnode);
        }
        
    }
    
    pthread_mutex_unlock(&mutex);
    goto after_published;
}

/**
 * @brief NOTICE
 * Use this for the trash job, it will be automatically invoke
 */
int trash_list_data(void *ptr)
{
    LIST_DATA *data = ptr;
    if ( !data ) return FALSE;
    if ( data->nva ) e_memfree(data->nva);
    if ( data->pva ) e_memfree(data->pva);
    new_cstring_free(data->key);
    e_memfree(data);
    return TRUE;
}

/**
 * @brief NOTICE
 * To find the LIST_DATA's id val and return the LIST_DATA
 */
LIST_DATA *new_list_find_id(FCL_LIST *list, int id)
{
    FCL_NODE  *fnode;
    LIST_DATA *listData;
    if ( !list || FCL_LIST_NUM_P(list) ) return NULL;
    
    FCL_LIST_FOREACH_HEAD(list, fnode) {
        listData = FCL_NODE_DATA_P(fnode);
        if ( !listData )
            continue;
        if ( listData->id == id )
        {
            return listData;
        }
    } FCL_LIST_FOREACH_END();
    
    return NULL;
}

/**
 * @brief NOTICE
 * To find the LIST_DATA's val and return the LIST_DATA
 */
LIST_DATA *new_list_find_val(FCL_LIST *list, long val)
{
    FCL_NODE  *fnode;
    LIST_DATA *listData;
    if ( !list || FCL_LIST_NUM_P(list) ) return NULL;
    
    FCL_LIST_FOREACH_HEAD(list, fnode) {
        listData = FCL_NODE_DATA_P(fnode);
        if ( !listData )
            continue;
        if ( listData->val == val )
        {
            return listData;
        }
    } FCL_LIST_FOREACH_END();
    
    return NULL;
}

/**
 * @brief NOTICE
 * Use this function to Send SQL Query instead of the mysql_real_query, for that
 * in slave's replication, it will not work.
 */
int mb_query(int mysql_fd, char *query, unsigned long query_len)
{
/**
 * @brief NOTICE
 * Not implement now, for the reason that, the Query request is the most complex
 * request to deal with the response.
 */
    unsigned char     command;
    ssize_t               res;
    struct iovec  queryIoc[3];
    MYSQL_PACKET_HEADER   mph;
    
    mph.plen = ( unsigned int )( 1 + query_len );
    mph.psid = 0;
    
    queryIoc[0].iov_base = &mph;
    queryIoc[0].iov_len  = sizeof(mph);
    
    command = 0x03;
    queryIoc[1].iov_base = &command;
    queryIoc[1].iov_len  = sizeof(command);
    
    queryIoc[2].iov_base = query;
    queryIoc[2].iov_len  = sizeof(char) * query_len;
    
    res = writev(mysql_fd, queryIoc, 3);
    
    return res;
}

/**
 * @brief NOTICE
 * Run this method to find the Table's Fields length and store
 * into the LIST_DATA structure for later use.
 */
MYSQL_RES *mb_query_table_field_length(char *table, unsigned long table_len, char *schema)
{
/**
 * @brief NOTICE
 * Should use this method to send the MySQL Query, for the replication slave thread, will
 * reject to do.
 */
    if ( !table_len ||
        queryInstance == NULL ) {
        return NULL;
    }
    int               res;
    MYSQL_RES   *mysqlRes;
    CSTRING     *cstring;
    cstring = new_cstring();
    
    new_cstring_add_string(cstring,
        E_STRL("SELECT "
               "CHARACTER_OCTET_LENGTH,COLUMN_NAME, "
               "NUMERIC_PRECISION, NUMERIC_SCALE "
               " FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '"));
    new_cstring_add_string(cstring, schema, strlen(schema));
    new_cstring_add_string(cstring, E_STRL("' AND TABLE_NAME = '"));
    
    new_cstring_add_string(cstring, table, table_len);
    new_cstring_add_char(cstring, '\'');
    
    res = mysql_real_query(queryInstance, cstring->s, cstring->l);
    new_cstring_free(cstring);
    if ( res == -1 ) {
        return NULL;
    }
    
    return mysql_store_result(queryInstance);
}

/**
 * @brief NOTICE
 * This will send the COM_REGISTER_SLAVE request to the mysql server
 * and then receive the mysql response to check wheather successful or not.
 * @return TRUE or FALSE
 */
int mb_register_slave(int mysql_fd, unsigned long master_id, unsigned long server_id,
                      char *slave_hostname, unsigned char slave_hostname_length,
                      char *slave_user, unsigned char slave_user_len,
                      char *slave_password, unsigned char slave_password_len,
                      unsigned int slave_port)
{
    struct iovec rep[11];
    ssize_t         res;
    unsigned char   com;
    
    e_memzero(rep, sizeof(rep));
    
/**
 * @brief NOTICE
 * First MySQL header
 */
    MYSQL_PACKET_HEADER mph;
    e_memzero(&mph, sizeof(mph));
    mph.plen = ( unsigned int )( (int)18 + slave_hostname_length +
        slave_password_len + slave_user_len ); /* 35 */
    mph.psid = 0; /* Sequence ID */
    rep[0].iov_base = &mph;
    rep[0].iov_len  = sizeof(mph);
    
/**
 * @brief NOTICE
 * Second part
 */
    com = 0x15; /* COM_REGISTER_SLAVE */
    rep[1].iov_base = &com;
    rep[1].iov_len  = sizeof( unsigned char );
    
    rep[2].iov_base = &server_id;
    rep[2].iov_len  = sizeof(char) * 4;
    
    rep[3].iov_base = &slave_hostname_length;
    rep[3].iov_len  = sizeof(char);
    
/**
 * @brief NOTICE
 * Middle part
 */
    rep[4].iov_base = slave_hostname;
    rep[4].iov_len  = sizeof(char) * slave_hostname_length;
    
    rep[5].iov_base = &slave_user_len;
    rep[5].iov_len  = sizeof( unsigned char );
    
    rep[6].iov_base  = slave_user;
    rep[6].iov_len   = sizeof(char) * slave_user_len;
    
    rep[7].iov_base  = &slave_password_len;
    rep[7].iov_len   = sizeof( unsigned char );
    
    rep[8].iov_base  = slave_password;
    rep[8].iov_len   = sizeof(char) * slave_password_len;
    
    rep[9].iov_base  = &slave_port;
    rep[9].iov_len   = sizeof( char ) * 2;
    
/**
 * @brief NOTICE
 * Foot part
 */
    typedef struct _REGISTER_SLAVE_FOOT {
        unsigned long rank: 32;
        unsigned long master_id : 32;
    } REGISTER_SLAVE_FOOT;
    
    REGISTER_SLAVE_FOOT rsf;
    e_memzero(&rsf, sizeof(rsf));
    rsf.master_id = master_id;
    rsf.rank  = 0x0;
    rep[10].iov_base  = &rsf;
    rep[10].iov_len   = sizeof(rsf);
    
/**
 * @brief NOTICE
 * Send the request to the mysql server
 */
    res = writev(mysql_fd, rep, 11);
    if ( res == mph.plen + 4 )
    {
/**
 * @brief NOTICE
 * Read mysql response
 */
        MYSQL_RESPONSE_PACKET mrp;
        e_memzero(&mrp, sizeof(mrp));
        
        res = read(mysql_fd, &mrp, sizeof(mrp));
        if ( mrp.header == 0x00 || mrp.header == 0xfe )
        {
            return TRUE;
        }
        else return FALSE;
    }
    else
    {
        return FALSE;
    }
}

/**
 * @brief NOTICE
 * Use this function to get the MySQL engine inner data type
 */
char *mb_get_inner_mysql_type(int type)
{
    /*
      MYSQL_TYPE_JSON=245,
      MYSQL_TYPE_NEWDECIMAL=246,
      MYSQL_TYPE_ENUM=247,
      MYSQL_TYPE_SET=248,
      MYSQL_TYPE_TINY_BLOB=249,
      MYSQL_TYPE_MEDIUM_BLOB=250,
      MYSQL_TYPE_LONG_BLOB=251,
      MYSQL_TYPE_BLOB=252,
      MYSQL_TYPE_VAR_STRING=253,
      MYSQL_TYPE_STRING=254,
      MYSQL_TYPE_GEOMETRY=255
    */
    if ( type == MYSQL_TYPE_BINARY )
    {
        return "MYSQL_TYPE_BINARY";
    }
    switch (type)
    {
        case MYSQL_TYPE_JSON:
            return "MYSQL_TYPE_JSON";
        case MYSQL_TYPE_BLOB:
            return "MYSQL_TYPE_BLOB";
        case MYSQL_TYPE_DECIMAL:
            return "MYSQL_TYPE_DECIMAL";
        case MYSQL_TYPE_NEWDECIMAL:
            return "MYSQL_TYPE_NEWDECIMAL";
        case MYSQL_TYPE_TINY_BLOB:
            return "MYSQL_TINY_BLOB";
        case MYSQL_TYPE_LONG_BLOB:
            return "MYSQL_TYPE_LONG_BLOB";
    }
    return MySQL_TYPES[type];
}

int  mb_get_mysql_type_len(int type)
{
/*
  MYSQL_TYPE_JSON=245,
  MYSQL_TYPE_NEWDECIMAL=246,
  MYSQL_TYPE_ENUM=247,
  MYSQL_TYPE_SET=248,
  MYSQL_TYPE_TINY_BLOB=249,
  MYSQL_TYPE_MEDIUM_BLOB=250,
  MYSQL_TYPE_LONG_BLOB=251,
  MYSQL_TYPE_BLOB=252,
  MYSQL_TYPE_VAR_STRING=253,
  MYSQL_TYPE_STRING=254,
  MYSQL_TYPE_GEOMETRY=255
*/
    int     len;
    
    if ( type > 19 )
    {
        switch ( type )
        {
            case MYSQL_TYPE_JSON:
                len = 1;
                break;
            case MYSQL_TYPE_NEWDECIMAL:
                len = 5;
                break;
            case MYSQL_TYPE_ENUM:
                len = 1;
                break;
            case MYSQL_TYPE_SET:
                len = 1;
                break;
            case MYSQL_TYPE_TINY_BLOB:
                len = 1;
                break;
            case MYSQL_TYPE_MEDIUM_BLOB:
                len = 2;
                break;
            case MYSQL_TYPE_LONG_BLOB:
                len = 4;
                break;
            case MYSQL_TYPE_BLOB:
                len = 2;
                break;
            case MYSQL_TYPE_DECIMAL:
                len = 5; /* MYSQL_TYPE_DECIMAL */
                break;
            case MYSQL_TYPE_VAR_STRING:
                len = 2;
                break;
            case MYSQL_TYPE_STRING:
                len = 2;
                break;
            default:
                len = 1;
                break;
        }
    }
    else
    {
        len = MySQL_TYPE_LEN[type];
    }
    
    return len;
}

/**
 * @brief NOTICE
 * This will lead the MySQL to send binlog to the fake-slave after return TRUE
 * if error, see the result log.
 */
int mb_binlog_dump(int mysql_fd, unsigned long binlog_pos,
                    unsigned long server_id, char *binlog_filename,
                    unsigned int binlog_filename_len )
{
    
    ssize_t         res;
    struct iovec    vec[3];
    
    struct _COM_binlog_DUMP
    {
        unsigned char cmd;
        unsigned char binlog_pos[4];
        unsigned char flags[2];
        unsigned char server_id[4];
    } COM_binlog_DUMP;
    MYSQL_PACKET_HEADER  mph = { 0 };
    
    mph.plen = 11 + binlog_filename_len;
    mph.psid = 0; /* !!! First request must be zero */
    vec[0].iov_base = &mph;
    vec[0].iov_len  = sizeof(mph);
    
    COM_binlog_DUMP.cmd = 0x12;
    e_copymem(COM_binlog_DUMP.server_id,
        &server_id, sizeof(COM_binlog_DUMP.server_id));
    COM_binlog_DUMP.flags[0] = 0x00;
    COM_binlog_DUMP.flags[1] = 0x00;
    e_copymem(COM_binlog_DUMP.binlog_pos,
        &binlog_pos, sizeof(COM_binlog_DUMP.binlog_pos));
    vec[1].iov_base = &COM_binlog_DUMP;
    vec[1].iov_len = sizeof(COM_binlog_DUMP);
    
    vec[2].iov_base = binlog_filename;
    vec[2].iov_len  = binlog_filename_len;
    
/**
 * @brief NOTICE
 * Send the request and get the Response
 */
    res = writev(mysql_fd, vec, 3);
    if ( res == (15 + binlog_filename_len) ) {
        return TRUE;
    } else {
        return FALSE;
    }
}


/**
 * @brief NOTICE
 * The first parameter was the mysql fd.
 * The second parameter was the EventSize( size of the event (header, post-header, body) )
 */
void mb_parse_query_event( int mysql_fd, MYSQL_PACKET_HEADER *mph, BINLOG_HEADER *bh )
{
    QUERY_EVENT_HEADER qeh;
    ssize_t            res;
    char              *buf;
    
    e_memzero(&qeh, sizeof(qeh));
    
    res = mb_socket_read(mysql_fd, &qeh, sizeof(qeh));
    if ( !res )
    {
#if FC_DEBUG
        fprintf(stderr, "Get MySQL Query Event wrong, msg: %s\n", strerror(errno));
#endif
        return ;
    }
    mph->plen -= res;
/**
 * @brief NOTICE
 * After reading successfully, minus the readed data
 * after reading the Post-header, there are also payload need to be read.
 */
    long long status_var_length = copy_data_from_little( ( char * )qeh.status_vars_length, 2);
#if FC_DEBUG
    printf("Salve proxy id: %lld\n", copy_data_from_little( ( char * )qeh.slave_proxy_id, 4));
    
    printf("Execution Time: %lld\n", copy_data_from_little( ( char * )qeh.execution_time, 4));
    
    printf("Schema Length: %d\n", qeh.schema_length);
    
    printf("Error Code: %lld\n", copy_data_from_little( ( char * )qeh.error_code, 2));
    
    printf("Status Vars Length: %lld\n", status_var_length);
#endif
    
/**
 * @brief NOTICE
 * Reading the status vars
 */
    buf = malloc( sizeof(char) * (status_var_length + 1));
    res = mb_socket_read(mysql_fd, buf, sizeof(char) * status_var_length);
    buf[status_var_length] = '\0';
    mph->plen -= res;
    
#if FC_DEBUG
    printf("StatusVars: %s\n", buf);
#endif
    free(buf);
    
    buf = malloc( sizeof(char) * (qeh.schema_length + 1));
    res = mb_socket_read(mysql_fd, buf, sizeof(char) * (qeh.schema_length + 1));
    mph->plen -= res;
    
#if FC_DEBUG
    printf("Schema: %s\n", buf);
#endif
    free(buf);
    
    buf = malloc( sizeof(char) * (mph->plen+1));
    e_memzero(buf, sizeof(char) * (mph->plen+1));
    res = mb_socket_read(mysql_fd, buf, sizeof(char) * (mph->plen));
    mph->plen -= res;
    
#if FC_DEBUG
    printf("Query: %s\n\n", buf);
#endif

/**
 * @brief NOTICE
 * Format the JSON RabbitMQ message
 */
    struct cJSON *root = cJSON_CreateObject();
    cJSON_AddNumberToObject(root, "etype", E_RMQ_SQL_QUERY);
    cJSON_AddStringToObject(root, "data", buf);
    fcl_list_push(rmqData, new_rmq_node(cJSON_PrintUnformatted(root)), trash_rmq_node);
    cJSON_Delete(root);
    free(buf);
}

/**
 * @brief NOTICE
 * Use this function to parse the Rotate Event
 */
void mb_parse_rotate_event( int mysql_fd, MYSQL_PACKET_HEADER *mph, BINLOG_HEADER *bh )
{
    char                         *buf;
    ssize_t                       res;
    unsigned char position[8] = { 0 };
    
    res = mb_socket_read(mysql_fd, position, sizeof(position));
    if ( !res )
    {
#if FC_DEBUG
        fprintf(stderr, "Get Rotate Event wrong, msg: %s\n", strerror(errno));
#endif
        return ;
    }
    mph->plen -= res;
    
    e_copymem(echar2long.echar, position, sizeof(position));
#if FC_DEBUG
    printf("Next event log position: %ld\n", echar2long.elong);
#endif
    
    buf = malloc( sizeof(char) * (mph->plen + 1));
    res = mb_socket_read(mysql_fd, buf, sizeof(char) * mph->plen);
    if ( !res )
    {
#if FC_DEBUG
        printf("Get the next binlog filename failed, msg: %s\n", strerror(errno));
#endif
    }
    buf[mph->plen] = '\0';
#if FC_DEBUG
    printf("Next binlog file: %s\n\n", buf);
#endif
    free(buf);
    mph->plen -= res;
}

/**
 * @brief NOTICE
 * Use this function to do the format desc event
 */
void mb_parse_format_desc_event( int mysql_fd, MYSQL_PACKET_HEADER *mph, BINLOG_HEADER *bh )
{
    struct _FORMAT_INFO {
        unsigned char binlog_version[2];
        unsigned char mysql_server_version[50];
        unsigned char create_timestamp[4];
        unsigned char event_header_length;
    } formatInfo;
    
    ssize_t             res;
    char               *buf;
    
    e_memzero(&formatInfo, sizeof(formatInfo));
    
/**
 * @brief NOTICE
 * If the previous memory not freed, free it and then allocated new one.
 */
    if ( FORMAT_description_DATA.eventsLengths != NULL )
    {
        e_memfree(FORMAT_description_DATA.eventsLengths);
    }
    e_memzero(&FORMAT_description_DATA, sizeof(FORMAT_description_DATA));
    
    
    res = mb_socket_read(mysql_fd, &formatInfo, sizeof(formatInfo));
    if ( !res )
    {
#if FC_DEBUG
        fprintf(stderr, "Get Format Description Event header wrong, msg: %s\n", strerror(errno));
#endif
        return ;
    }
    mph->plen -= res;
    FORMAT_description_DATA.nes = formatInfo.event_header_length;
    
/**
 * @brief NOTICE
 * Print header info.
 */
#if FC_DEBUG
    printf("Binlog Version: V%lld\n", copy_data_from_little( ( char * )formatInfo.binlog_version, 2));
    
    printf("MySQL Server Version: %s\n", formatInfo.mysql_server_version);
#endif
#if FC_DEBUG
    printf("Create TimeStamp: %lld\n", copy_data_from_little( ( char * )formatInfo.create_timestamp, 4));
    
    printf("Event Header Length: %d\n", formatInfo.event_header_length);
#endif
    
/**
 * @brief NOTICE
 * Buf for event type header lengths;
 */
    buf = malloc(sizeof(char) * mph->plen);
    e_memzero(buf, sizeof(char) * mph->plen);
    res = mb_socket_read(mysql_fd, buf, sizeof(char) * mph->plen);
    if ( !res )
    {
#if FC_DEBUG
        printf("Read Event type header of Format Description Event faild, msg: %s\n", strerror(errno));
#endif
        return ;
    }
    mph->plen -= mph->plen;
/*
    int i = 0;
    while (mph->plen)
    {
#if FC_SHOW_DEBUG
        printf("{%d} ", buf[i++]);
#endif
        mph->plen--;
    }
#if FC_SHOW_DEBUG
    printf(":%d\n\n", i);
#endif
*/
/**
 * @brief NOTICE
 * Store it into the global variable
 * and then the next events should use the info from it.
 */
    FORMAT_description_DATA.eventsLengths = buf;
}

/**
 * @brief NOTICE
 * Use this function to do the Table Map Event parsing job
 */
void mb_parse_table_map_event( int mysql_fd, MYSQL_PACKET_HEADER *mph, BINLOG_HEADER *bh )
{
/**
 * @brief NOTICE
 * In this Event, os should to create an list to keep the Table map info for the next event use
 * if tableMap not empty, to destroy it and then create a new one.
 */
    if ( tableMaps != NULL )
    {
        fcl_list_destroy(tableMaps);
    }
    long long         tableId;
    LIST_DATA        *listData;
    char             *buff;
    MYSQL_RES        *mysqlRes;
    tableMaps = new_fcl_list();
    int                 nbt; /* ( column-count + 8 ) / 7 */
    ssize_t             res;
    char               *buf;
    
    if ( FORMAT_description_DATA.eventsLengths[TABLE_MAP_EVENT - 1] == 6 )
    {
        struct _POST_HEADER
        {
            unsigned char table_id[4];
            unsigned char flags[2];
        } postHeader;
    
        e_memzero(&postHeader, sizeof(postHeader));
    
        res = mb_socket_read(mysql_fd, &postHeader, sizeof(postHeader));
        if ( !res )
        {
#if FC_DEBUG
            fprintf(stderr, "Read Table map event post header wrong, msg: %s\n", strerror(errno));
#endif
            return ;
        }
        mph->plen -= res;
        tableId = copy_data_from_little( ( char * )postHeader.table_id, 4);
#if FC_DEBUG
        printf("Table Id: %lld\n", tableId);
        printf("Flags: %lld\n", copy_data_from_little( ( char * )postHeader.flags, 2));
#endif
    }
    else
    {
        struct _POST_HEADER
        {
            unsigned char table_id[6];
            unsigned char flags[2];
        } postHeader;
        
        e_memzero(&postHeader, sizeof(postHeader));
    
        res = mb_socket_read(mysql_fd, &postHeader, sizeof(postHeader));
        if ( !res )
        {
#if FC_DEBUG
            fprintf(stderr, "Read Table map event post header wrong, msg: %s\n", strerror(errno));
#endif
            return ;
        }
        mph->plen -= res;
        tableId = copy_data_from_little( ( char * )postHeader.table_id, 6);
#if FC_DEBUG
        printf("Table Id: %lld\n", tableId);
        printf("Flags: %lld\n", copy_data_from_little( ( char * )postHeader.flags, 2));
#endif
    
    }
    
    struct _LENGTH
    {
        unsigned char len;
    } length;
    
    e_memzero(&length, sizeof(length));
    res = mb_socket_read(mysql_fd, &length, sizeof(length));
    if ( !res )
    {
#if FC_DEBUG
        fprintf(stderr, "Read Schema length wrong, msg: %s\n", strerror(errno));
#endif
        return ;
    }
    mph->plen -= res;
    
#if FC_DEBUG
    printf("Schema Name Length: %d\n", length.len);
#endif
    
/**
 * @brief NOTICE
 * This will skip the [00] char for next reading.
 */
    buff = malloc( sizeof(char) * (length.len + 1) );
    res = mb_socket_read(mysql_fd, buff, sizeof(char) * ( length.len + 1 ));
    if ( !res )
    {
#if FC_DEBUG
        fprintf(stderr, "Read Schema name wrong, msg: %s\n", strerror(errno));
#endif
        return ;
    }
    mph->plen -= res;
#if FC_DEBUG
    printf("Schema name: %s\n", buff);
#endif
    
    res = mb_socket_read(mysql_fd, &length, sizeof(length));
    if ( !res )
    {
#if FC_DEBUG
        fprintf(stderr, "Read table name length wrong, msg: %s\n", strerror(errno));
#endif
        return ;
    }
    mph->plen -= res;
#if FC_DEBUG
    printf("Table name length: %d\n", length.len);
#endif
    
    buf = NULL;
    res = mb_socket_read_str_len(mysql_fd, &buf, length.len + 1);
    if ( !res )
    {
#if FC_DEBUG
        fprintf(stderr, "Read table name wrong, msg: %s\n", strerror(errno));
#endif
        return ;
    } 
    mph->plen -= res;
    mysqlRes   = mb_query_table_field_length(buf, length.len, buff);
#if FC_DEBUG
    printf("Table name: %s\n", buf);
#endif
    free(buf);
    free(buff);
    
    res = mb_socket_read(mysql_fd, &length, sizeof(length));
    if ( !res )
    {
#if FC_DEBUG
        fprintf(stderr, "Read Column count wrong, msg: %s\n", strerror(errno));
#endif
        return ;
    }
    mph->plen -= res;
#if FC_DEBUG
    printf("Column Length: %d\n", length.len);
#endif
    
    buf = NULL;
    res = mb_socket_read_str_len(mysql_fd, &buf, length.len);
    if ( !res )
    {
#if FC_DEBUG
        fprintf(stderr, "Read Column Type meta info wrong, msg: %s\n", strerror(errno));
#endif
        return ;
    }
    mph->plen -= res;
    
    int i = 0;
    MYSQL_ROW mysqlRow;
#if FC_DEBUG
    printf("Column Type: \n-------------------\n");
#endif
    while ( length.len-- )
    {
        listData = new_list_data();
        listData->id  = i+1;
        listData->val = tableId;
        /* listData->ity = buf[i]; */
        listData->ity = ( int )copy_data_from_little( buf + i, 1);
        listData->ile = mb_get_mysql_type_len(listData->ity);
        mysqlRow = mysql_fetch_row(mysqlRes);
        if ( buf[i] == MYSQL_TYPE_VARCHAR )
        {
            if ( mysqlRow[0] != NULL
                && strtol(mysqlRow[0], NULL, 10) > 255 ) {
                listData->ile = 2;
            } else {
                listData->ile = 1;
            }
        }
        elif ( listData->ity == MYSQL_TYPE_NEWDECIMAL /* MYSQL_TYPE_DECIMAL */ )
        {
            listData->precision = mysqlRow[2] ? ( int )strtol( mysqlRow[2], NULL, 10) : 10;
            listData->scale     = mysqlRow[3] ? ( int )strtol( mysqlRow[3], NULL, 10) : 0;
        }
        new_cstring_add_string(listData->key, mysqlRow[1], strlen(mysqlRow[1]));
#if FC_DEBUG
        printf("%d(%s)  ", listData->ity, mb_get_inner_mysql_type(listData->ity));
#endif
        i++;
        
        new_list_push(tableMaps, listData);
    }
    e_memfree(buf);
    if ( !mysqlRes )
    {
        mysql_free_result(mysqlRes);
    }
    
/**
 * @brief NOTICE
 * Here to read the column-meta-def and NULL-bitmask
 * NULL bitmask length is (column-count + 8) / 7
 */
    nbt = ( int )( ( FCL_LIST_NUM_P( tableMaps) + 8 ) / 7 );
#if FC_DEBUG
    printf("\nNull-bitmask length: %d\n", nbt);
#endif
    
    buf = NULL;
    res = mb_socket_read_str_len(mysql_fd, &buf, mph->plen);
    if ( !res )
    {
        fprintf(stderr, "Read column-meta-def & Null-bitmask wrong, msg: %s\n", strerror(errno));
        return ;
    }
    mph->plen -= res;
/**
 * @brief NOTICE
 * Print the column-meta-def & Null-bitmask info with hexdec
 */
    i = 0;
    while ( res-- )
    {
        if ( res < nbt )
        {
            #if FC_DEBUG
            printf("\nNull-bitmask: 0x%x(%d)\t", buf[i], buf[i]);
            #endif
        }
        else
        {
            #if FC_DEBUG
            printf("0x%x(%d)\t", buf[i], buf[i]);
            #endif
        }
        i++;
    }
    e_memfree(buf);
#if FC_DEBUG
    printf("\n--------------------\n\n");
#endif
}

/**
 * @brief NOTICE
 * Use this function to do the DELETE | UPDATE | INSERT event job.
 * This Event must be the most diffculty event to be parsing,
 * contains a huge number of MySQL type need to be deal with.
 */
void mb_parse_rows_event( int mysql_fd, MYSQL_PACKET_HEADER *mph, BINLOG_HEADER *bh , int t/* Just for compability */, int etyp, int mysql_event)
{
    ssize_t         res;
    unsigned long   tableId;
    unsigned long   nbt;
    unsigned long   count;
    int             num;
    FCL_NODE       *fnode;
    LIST_DATA      *listData;
    char           *buff;
    char           *vuff;
    struct cJSON   *root;
    struct cJSON   *data;
    struct cJSON   *temp;
    
    if ( tableMaps == NULL ){
        nbt = 1;
    } else {
        nbt = ( FCL_LIST_NUM_P(tableMaps) + 7 ) / 8;
    }
    count = nbt * t; /* Update need to be 2 */
    
    if ( FORMAT_description_DATA.eventsLengths[mysql_event - 1] == 6 )
    {
        struct _HEADER {
        unsigned char tableId[4];
        unsigned char flags[2];
        unsigned char extraDataLength[2];
        /* Only when before field big than 2, extra_data have space */
        } header;
    
        e_memzero(&header, sizeof(header));
    
        res = mb_socket_read(mysql_fd, &header, sizeof(header));
    
        tableId = ( unsigned long )copy_data_from_little( ( char * )header.tableId, 6);
    } else {
        struct _HEADER {
        unsigned char tableId[6];
        unsigned char flags[2];
        unsigned char extraDataLength[2];
        /* Only when before field big than 2, extra_data have space */
        } header;
    
        e_memzero(&header, sizeof(header));
    
        res = mb_socket_read(mysql_fd, &header, sizeof(header));
    
        tableId = ( unsigned long )copy_data_from_little( ( char * )header.tableId, 6);
    }
    
    if ( !res )
    {
#if FC_DEBUG
        fprintf(stderr, " Read the Header wrong, msg: %s\n", strerror(errno));
#endif
        return ;
    }
    mph->plen -= res;
    
    struct _LENGTH {
        unsigned char len;
    } length;
    
    res = mb_socket_read(mysql_fd, &length, sizeof(length));
    if ( !res )
    {
#if FC_DEBUG
        fprintf(stderr, "Read Number of columns wrong, msg: %s\n", strerror(errno));
#endif
        return ;
    }
    mph->plen -= res;
    
/**
 * @brief NOTICE
 * Reading the nbitmap
 */
    buff = NULL;
    res  = mb_socket_read_str_len(mysql_fd, &buff, sizeof(char) * count);
    if ( !res )
    {
#if FC_DEBUG
        fprintf(stderr, "Read the mask of the columns wrong, msg: %s\n", strerror(errno));
#endif
        return ;
    }
    mph->plen -= res;
    
    long bitsets = bitsets_in_var(
        ( int )copy_data_from_little( buff, ( int )( count > 1 ? count / 2 : 1 ) ) );
    nbt = ( unsigned long )( ( bitsets + 7 ) / 8 );
    
    free(buff);
    
    count = FCL_LIST_NUM_P(tableMaps);
    
/**
 * @brief NOTICE
 * Generate the Exjson structure
 */
    root = cJSON_CreateObject();
    cJSON_AddNumberToObject(root, "etype", etyp);
    
    if ( etyp == E_RMQ_UPDATE ) {
        data = cJSON_CreateObject();
    }
    
    num = 1;
    while ( mph->plen )
    {
        buff = NULL;
        res = mb_socket_read_str_len(mysql_fd, &buff, nbt);
        if ( !res )
        {
#if FC_DEBUG
            fprintf(stderr, "Read Rows bitmap wrong, msg: %s\n", strerror(errno));
#endif
            return ;
        }
/**
 * @brief NOTICE
 * Get each column mask
 */
        long long cmask = copy_data_from_little( buff, ( int )nbt );
        
        free(buff);
        mph->plen -= res;
        
        temp = cJSON_CreateObject();
        
        FCL_LIST_FOREACH_HEAD(tableMaps, fnode) {
            
            listData = FCL_NODE_DATA_P(fnode);
            
            if(!listData) {
                continue;
            }
/**
 * @brief NOTICE
 * Check wheather the column read now is NULL or not.
 * if null skip it.
 */
#if FC_DEBUG
            printf("%s:", listData->key->s);
#endif
            
            if ( (cmask & 1) == 1 ) {
                cJSON_AddNullToObject(temp, listData->key->s);
                cmask = cmask >> 1;
#if FC_DEBUG
                printf("Null\t");
#endif
                continue;
            } else { cmask = cmask >> 1; }
            
            buff = NULL;
            res = mb_socket_read_str_len( mysql_fd, &buff, ( size_t )listData->ile );
            if ( !res )
            {
#if FC_DEBUG
                fprintf(stderr, "Reading cloumn[%d] wrong, msg: %s\n", listData->id, strerror(errno));
#endif
                return ;
            }
            mph->plen -= res;
            
            switch (listData->ity)
            {
                case MYSQL_TYPE_BLOB: /*MYSQL_TYPE_BLOB*/
                {
                    vuff = NULL;
                    long len = copy_data_from_little(buff, 2);
                    res  = mb_socket_read_str_len( mysql_fd, &vuff, ( size_t )len );
                    if ( !res )
                    {
#if FC_DEBUG
                        fprintf(stderr, "Read BLOB length wrong, msg: %s\n", strerror(errno));
#endif
                        return ;
                    }
                    mph->plen -= res;

                    /* Express the BLOB type as string */
                    cJSON_AddStringToObject(temp, listData->key->s, vuff);
                    int i = 0;
                    #if FC_DEBUG
                    printf("BLOB: ");
                    #endif
                    while (res--)
                    {
                        #if FC_DEBUG
                        printf("0x%02x ", (unsigned char)(vuff[i++]));
                        #endif
                    }
                    #if FC_DEBUG
                    printf("\t");
                    #endif
                    e_memfree(vuff);
                    break;
                }
                case MYSQL_TYPE_NEWDECIMAL: /* MYSQL_TYPE_DECIMAL */
                {
                    int digitsPerInteger  = 9;
                    int compressedBytes[] = {
                        0,
                        1,
                        1,
                        2,
                        2,
                        3,
                        3,
                        4,
                        4,
                        4
                    };
                    int integral          = listData->precision - listData->scale;
                    int uncompIntegral    = ( integral / digitsPerInteger );
                    int uncompFractional  = ( listData->scale / digitsPerInteger );
                    int compIntegral      = integral - ( uncompIntegral * digitsPerInteger );
                    int compFractional    = listData->precision - ( uncompFractional * digitsPerInteger );
    
                    int     mask, size, value;
                    CSTRING *decimalRes   = new_cstring();
    
                    if ( (buff[ 0 ] & 0x80) != 0 )
                    {
                        mask = 0;
                    } else
                    {
                        mask = -1;
                        new_cstring_add_char( decimalRes, '-' );
                    }
    
                    size = compressedBytes[ compIntegral ];
                    if ( size > 0 )
                    {
                        value = ( int )( copy_number_from_big_stream( buff + 1, 4 ) ^ mask );
        
                        new_cstring_add_long_long( decimalRes, value );
                    }
                    new_cstring_add_char(decimalRes, '.');
                    
                    size = compressedBytes[ compFractional ];
                    if ( size > 0 )
                    {
                        value = (int) (copy_number_from_big_stream( buff + 1, 4));
                        new_cstring_add_long_long(decimalRes, value);
                    }
                    
                    printf("%s\n", decimalRes->s);
                    new_cstring_free(decimalRes);
                    
                    break;
                }
                case MYSQL_TYPE_FLOAT:
                {
                    float var = copy_float_from_little_stream(buff, listData->ile);
                    #if FC_DEBUG
                    printf("%f\t", var);
                    #endif
                    cJSON_AddNumberToObject(temp, listData->key->s, var);
    
                    break;
                }
                case MYSQL_TYPE_DOUBLE:
                {
                    double var = copy_double_from_little_stream(buff, listData->ile);
                    #if FC_DEBUG
                    printf("%f\t", var);
                    #endif
                    cJSON_AddNumberToObject(temp, listData->key->s, var);

                    break;
                }
                case MYSQL_TYPE_BIT:
                {
                    cJSON_AddNumberToObject(temp, listData->key->s, buff[0]);
                    printf("%d\t", buff[0]);
                    break;
                }
                case MYSQL_TYPE_BINARY:
                {
                    vuff = NULL;
                    res  = mb_socket_read_str_len(mysql_fd, &vuff, sizeof(char) * buff[0]);
                    if ( !res )
                    {
                        fprintf(stderr, "Read Binary data length wrong, msg: %s\n", strerror(errno));
                        return ;
                    }
                    mph->plen -= res;
                    #if FC_DEBUG
                        printf("%s\t", vuff);
                    #endif
                    
                    cJSON_AddStringToObject(temp, listData->key->s, vuff);
                    e_memfree(vuff);
                    break;
                }
                case MYSQL_TYPE_VARCHAR:
                {
/**
 * @brief NOTICE
 * If varchar type, This means need to read the real data from the beginning length
 */
                    vuff = NULL;
                    e_memzero(wchar2int.wchar, sizeof(wchar2int.wchar));
                    e_copymem(wchar2int.wchar, buff, ( size_t )listData->ile);
                    res = mb_socket_read_str_len(mysql_fd, &vuff, sizeof(char) * wchar2int.wint);
                    if ( !res )
                    {
#if FC_DEBUG
                        fprintf(stderr, "Get Real VARCHAR wrong, msg: %s\n", strerror(errno));
#endif
                        return ;
                    }
                    mph->plen -= res;
#if FC_DEBUG
                    printf("%s\t", vuff);
#endif
                    cJSON_AddStringToObject(temp, listData->key->s, vuff);
                    free(vuff);
                    break;
                }
                case MYSQL_TYPE_LONG:
                    fchar2long.flong = ( unsigned long )copy_data_from_little( buff, 4);
                    cJSON_AddNumberToObject(temp, listData->key->s, fchar2long.flong);
#if FC_DEBUG
                    printf("%ld\t", fchar2long.flong);
#endif
                    break;
                case MYSQL_TYPE_DATE:
                {
                    long long date = copy_data_from_little( buff, 3 );
                    if ( date != 0 )
                    {
                        int dateYear  = ( int )( ( date & ( ( ( 1 << 15 ) - 1 ) << 9 ) ) >> 9 );
                        int dateMonth = ( int )( ( date & ( ( ( 1 << 4 ) - 1 ) << 5 ) ) >> 5 );
                        int dateDay   = ( int )( date & ( ( 1 << 5 ) - 1 ) );
                        if ( dateDay == 0 || dateMonth == 0 || dateYear == 0 )
                        {
                            cJSON_AddStringToObject(temp, listData->key->s, "0000-00-00");
#if FC_DEBUG
                            printf( "0000-00-00\t" );
#endif
                        } else
                        {
                            CSTRING *dateTime = new_cstring();
                            new_cstring_add_long_long(dateTime, dateYear);
                            new_cstring_add_char(dateTime, '-');
                            new_cstring_add_long_long(dateTime, dateMonth);
                            new_cstring_add_char(dateTime, '-');
                            new_cstring_add_long_long(dateTime, dateDay);
                            new_cstring_0(dateTime);
                            cJSON_AddStringToObject(temp, listData->key->s, dateTime->s);
                            new_cstring_free(dateTime);
#if FC_DEBUG
                            printf( "%d-%d-%d\t", dateYear, dateMonth, dateDay );
#endif
                        }
                    } else
                    {
                        cJSON_AddStringToObject(temp, listData->key->s, "0000-00-00");
#if FC_DEBUG
                        printf( "0000-00-00\t" );
#endif
                    }
                    break;
                }
                case MYSQL_TYPE_TIME2:
                {
/**
 * @brief NOTICE
 * TIME2 is the following rule to get the bits from the Bitsets
 * Total size is 3bytes, It's the given order of Big Endian.
 */
                    e_memzero(&fchar2long, sizeof(fchar2long));
                    fchar2long.fchar[2] = ( unsigned char )buff[0];
                    fchar2long.fchar[1] = ( unsigned char )buff[1];
                    fchar2long.fchar[0] = ( unsigned char )buff[2];
                    long     dateTime2 = fchar2long.flong;
                    long     signedValue = 1;
                    long     hour, minute, second;
                    if ( dateTime2 >= 0x800000 ) {
                        dateTime2 -= 0x1000000;
                    }
                    if ( !binlog_slice_bit(dateTime2, 0, 1, 24) )
                    {
                        signedValue = -1;
                        dateTime2 = ~dateTime2 + 1;
                    }
                    hour   = signedValue * binlog_slice_bit(dateTime2, 2, 10, 24);
                    minute = binlog_slice_bit(dateTime2, 12, 6, 24);
                    second = binlog_slice_bit(dateTime2, 18, 6, 24);
                    
                    CSTRING *time2 = new_cstring();
                    new_cstring_add_long_long(time2, hour);
                    new_cstring_add_char(time2, ':');
                    new_cstring_add_long_long(time2, minute);
                    new_cstring_add_char(time2, ':');
                    new_cstring_add_long_long(time2, second);
                    new_cstring_0(time2);
                    
                    cJSON_AddStringToObject(temp, listData->key->s, time2->s);
                    new_cstring_free(time2);
#if FC_DEBUG
                    printf("%ld:%ld:%ld\t", hour, minute, second);
#endif
                    break;
                }
                case MYSQL_TYPE_TIMESTAMP:
                {

                }
                case MYSQL_TYPE_TIMESTAMP2:
                {
/**
 * @brief NOTICE
 * In inner MySQL engine, This type is the unix timespec sine 1970.x.x
 * On MySQL, The time's order is the given request order. so, you should use it manunaly
 */
                    e_memzero(&ffchar2long, sizeof(ffchar2long));
                    ffchar2long.fchar5[3] = ( unsigned char )buff[0];
                    ffchar2long.fchar5[2] = ( unsigned char )buff[1];
                    ffchar2long.fchar5[1] = ( unsigned char )buff[2];
                    ffchar2long.fchar5[0] = ( unsigned char )buff[3];
                    long timeSecond = ffchar2long.fflong;
                    struct tm *time_v    = localtime( ( const time_t * )&timeSecond );
                    
                    CSTRING *dateTime = new_cstring();
                    new_cstring_add_long_long( dateTime, time_v->tm_year + 1900 );
                    new_cstring_add_char( dateTime, '-');
                    new_cstring_add_long_long( dateTime, time_v->tm_mon + 1 );
                    new_cstring_add_char( dateTime, '-');
                    new_cstring_add_long_long( dateTime, time_v->tm_mday );
                    new_cstring_add_char( dateTime, ' ');
                    new_cstring_add_long_long( dateTime, time_v->tm_hour );
                    new_cstring_add_char( dateTime, ':');
                    new_cstring_add_long_long( dateTime, time_v->tm_min );
                    new_cstring_add_char( dateTime, ':');
                    new_cstring_add_long_long( dateTime, time_v->tm_sec );
                    new_cstring_0(dateTime);
                    
                    cJSON_AddStringToObject(temp, listData->key->s, dateTime->s);
                    new_cstring_free(dateTime);
#if FC_DEBUG
                    printf( "%d-%d-%d %d:%d:%d\t", time_v->tm_year + 1900, time_v->tm_mon + 1, time_v->tm_mday, time_v->tm_hour, time_v->tm_min,
                            time_v->tm_sec );
#endif
                    break;
                }
                case MYSQL_TYPE_DATETIME2:
                {
/**
 * @brief NOTICE
 * DateTime2 Type need to be read with the order of the given order
 * buff[0] buff[1] buff[2] buff[3] buff[4]
 */
                    e_memzero(&schar2long, sizeof(schar2long));
                    schar2long.schar[4] = ( unsigned char )buff[0];
                    schar2long.schar[3] = ( unsigned char )buff[1];
                    schar2long.schar[2] = ( unsigned char )buff[2];
                    schar2long.schar[1] = ( unsigned char )buff[3];
                    schar2long.schar[0] = ( unsigned char )buff[4];
                    long dateBuff = schar2long.slong;

                    long dateYearMonth = binlog_slice_bit(dateBuff, 1, 17, 40);
                    int dateYear  = ( int )( dateYearMonth / 13 );
                    int dateMonth = ( int )( dateYearMonth % 13 );

                    int dateDay   = ( int )binlog_slice_bit( dateBuff, 18, 5, 40);
                    int dateHour  = ( int )binlog_slice_bit( dateBuff, 23, 5, 40);
                    int dateMinute= ( int )binlog_slice_bit( dateBuff, 28, 6, 40);
                    int dateSecond= ( int )binlog_slice_bit( dateBuff, 34, 6, 40);

                    CSTRING *dateTime = new_cstring();
                    new_cstring_add_long_long(dateTime, dateYear);
                    new_cstring_add_char(dateTime, '-');
                    new_cstring_add_long_long(dateTime, dateMonth);
                    new_cstring_add_char(dateTime, '-');
                    new_cstring_add_long_long(dateTime, dateDay);
                    new_cstring_add_char(dateTime, ' ');
                    new_cstring_add_long_long(dateTime, dateHour);
                    new_cstring_add_char(dateTime, ':');
                    new_cstring_add_long_long(dateTime, dateMinute);
                    new_cstring_add_char(dateTime, ':');
                    new_cstring_add_long_long(dateTime, dateSecond);
                    new_cstring_0(dateTime);
#if FC_DEBUG
                    printf("%s\t", dateTime->s);
#endif
                    cJSON_AddStringToObject(temp, listData->key->s, dateTime->s);
                    new_cstring_free(dateTime);
                    break;
                }
                default:
                    fchar2long.flong = ( unsigned long )copy_data_from_little( buff, listData->ile);
                    cJSON_AddNumberToObject(temp, listData->key->s, fchar2long.flong );
#if FC_DEBUG
                    printf("%ld\t", fchar2long.flong);
#endif
                    break;
            }
/**
 * @brief NOTICE
 *  After using to delete the memory
 */
            free(buff);
            
        } FCL_LIST_FOREACH_END();
        
        if ( etyp == E_RMQ_UPDATE )
        {
            if ( num == 1 ) {
                cJSON_AddItemToObject(data, "pre", temp);
            } else {
                cJSON_AddItemToObject(data, "new", temp);
            }
        }
        num++;
/**
 * @brief NOTICE
 * Notice the Worker thread to do the job.
 */
#if FC_DEBUG
        printf("\n");
#endif
    }
    
    if ( etyp == E_RMQ_UPDATE )
    {
        cJSON_AddItemToObject(root, "data", data);
    }
    else
    {
        cJSON_AddItemToObject(root, "data", temp);
    }
    
/**
 * @brief NOTICE
 * Add to the Sending List wainting for publishing
 */
    fcl_list_push(rmqData, new_rmq_node(cJSON_PrintUnformatted(root)), trash_rmq_node);
    ReadyJob = 1;
    pthread_cond_broadcast(&cond);
    cJSON_Delete(root);
    
#if FC_DEBUG
    printf("\n\n");
#endif

}

/**
 * @brief NOTICE
 * Use this function to do the INSERT event, This function will call the previous funtion
 */
void mb_parse_write_rows_event( int mysql_fd, MYSQL_PACKET_HEADER *mph, BINLOG_HEADER *bh )
{
/**
 * @brief NOTICE
 * The request packet the same with the DELETE request packet, just to invoke the same method is ok
 */
    mb_parse_rows_event(mysql_fd, mph, bh, 1, E_RMQ_INSERT, WRITE_ROWS_EVENT);
}

/**
 * @brief NOTICE
 * Use this function to parse the mysql DELETE request packets
 */
void mb_parse_delete_rows_event( int mysql_fd, MYSQL_PACKET_HEADER *mph, BINLOG_HEADER *bh )
{
/**
 * @brief NOTICE
 * To DELETE the Rows event will happen, Request MySQL packet is the same with the WRITE request
 * just reinvoke will ok
 */
    mb_parse_rows_event(mysql_fd, mph, bh, 1, E_RMQ_DELETE, DELETE_ROWS_EVENT);
}

/**
 * @brief NOTICE
 * Parsing the XID event Request MySQL packet
 */
void mb_parse_xid_event( int mysql_fd, MYSQL_PACKET_HEADER *mph, BINLOG_HEADER *bh )
{
    ssize_t             res;
    
    struct _PAYLOAD {
        unsigned char xid[8];
    } payload;
    
    e_memzero(&payload, sizeof(payload));
    
    res = mb_socket_read(mysql_fd, &payload, sizeof(payload));
    if ( !res )
    {
        logger(LOG_ERROR, "Reading the XID Event wrong, msg: %s\n", strerror(errno));
        return ;
    }
    mph->plen -= res;
    
    e_memzero(echar2long.echar, sizeof(echar2long.echar));
    e_copymem(echar2long.echar, payload.xid, sizeof(echar2long.echar));
    
#if FC_DEBUG
    printf("Transaction ID: %ld\n\n", echar2long.elong);
#endif
    
/**
 * @brief NOTICE
 * Format the JSON RabbitMQ message
 */
    struct cJSON *root = cJSON_CreateObject();
    cJSON_AddNumberToObject(root, "etype", E_RMQ_SQL_QUERY);
    cJSON_AddStringToObject(root, "data", "COMMIT");
    fcl_list_push(rmqData, new_rmq_node(cJSON_PrintUnformatted(root)), trash_rmq_node);
    cJSON_Delete(root);
}

/**
 * @brief NOTICE
 * Parse the binlog event
 */
void mb_parse_binlog(int mysql_fd)
{
    ssize_t             res;
    MYSQL_PACKET_HEADER mph;
    BINLOG_HEADER        bh;
    
    char buff[100000];
    
    while ( TRUE )
    {
        e_memzero(&mph, sizeof(mph));
        res = mb_socket_read(mysql_fd, &mph, sizeof(mph));
        if ( !res ) {
            logger(LOG_ERROR, "Read MySQL packet error, msg: %s\n", strerror(errno));
            break;
        }
        
        while ( mph.plen )
        {
/**
 * @brief NOTICE
 * Read the mysql packet one by one
 */
            e_memzero(&bh, sizeof(bh));
            res = mb_socket_read(mysql_fd, &bh, sizeof(bh));
            if ( res == -1 ){
                logger(LOG_ERROR, "Read MySQL binlog header packet error, msg: %s\n", strerror(errno));
                break;
            }
            mph.plen -= res;
/**
 * @brief NOTICE
 * After reading the res length of data, minus it.
 */
            switch ( bh.event_type )
            {
                case QUERY_EVENT:
#if FC_DEBUG
                    printf("Query Event\n");
#endif
                    mb_parse_query_event(mysql_fd, &mph, &bh);
                    break;
                case ROTATE_EVENT:
#if FC_DEBUG
                    printf("Rotate Event\n");
#endif
                    mb_parse_rotate_event(mysql_fd, &mph, &bh);
                    break;
                case FORMAT_DESCRIPTION_EVENT:
#if FC_DEBUG
                    printf("Format Description Event\n");
#endif
                    mb_parse_format_desc_event(mysql_fd, &mph, &bh);
                    break;
                case TABLE_MAP_EVENT:
#if FC_DEBUG
                    printf("Table Map Event\n");
#endif
                    mb_parse_table_map_event(mysql_fd, &mph, &bh);
                    break;
                case UPDATE_ROWS_EVENT: /* 0x1f */
#if FC_DEBUG
                    printf("Update Rows Event\n");
#endif
                    mb_parse_rows_event(mysql_fd, &mph, &bh, 2, E_RMQ_UPDATE, UPDATE_ROWS_EVENT);
                    break;
                case XID_EVENT:
#if FC_DEBUG
                    printf("XID Event\n");
#endif
                    mb_parse_xid_event(mysql_fd, &mph, &bh);
                    break;
                case WRITE_ROWS_EVENT: /* 0x1e */
#if FC_DEBUG
                    printf("Write Rows Event\n");
#endif
                    mb_parse_write_rows_event(mysql_fd, &mph, &bh);
                    break;
                case DELETE_ROWS_EVENT:
#if FC_DEBUG
                    printf("Delete Rows Event\n");
#endif
                    mb_parse_delete_rows_event(mysql_fd, &mph, &bh);
                    break;
                default:
/**
 * @brief NOTICE
 * If there is some protocol don't need to be support, Kernel will to be here
 * only just to throw up the Request MySQL packets
 */
                    res = mb_socket_read(mysql_fd, buff, mph.plen);
                    if ( bh.event_type == UPDATE_ROWS_EVENT ) {
#if FC_DEBUG
                        printf("\n");
#endif
                    }
                    mph.plen -= res;
#if FC_DEBUG
                    printf("Other Events, event number: 0x%02x\n\n", bh.event_type);
#endif
                    break;
            }
        }
        
    }
}

/**
 * @brief NOTICE
 * This function is the main function for the Salve to Sync data from the MySQL master
 */
void mb_begin_slave()
{
    MYSQL           *mysql;
    MYSQL_ROW        row;
    MYSQL_RES       *mysqlRes;
    ssize_t          res;
    unsigned int     port;
    unsigned long    seid;
    char     *host, *user;
    char     *conf;
    CXML     *cxml;
    long      len;
    char     *password;
    sigset_t          sset;
    struct sigaction  sig;
    
/**
 * @brief NOTICE
 * Keep the signal to be promasked
 */
    e_memzero(&sig, sizeof(struct sigaction));
    sig.sa_sigaction = sig_callback;
    sig.sa_flags = SA_SIGINFO;
    
    sigemptyset(&sig.sa_mask);
    sigaddset(&sig.sa_mask, SIGUSR1);
    sigaction(SIGUSR1, &sig, NULL);
    
    sigemptyset(&sig.sa_mask);
    sigaddset(&sig.sa_mask, SIGUSR2);
    sigaction(SIGUSR2, &sig, NULL);
    
    sigfillset(&sset);
    sigdelset(&sset, SIGUSR1);
    sigdelset(&sset, SIGUSR2);
    sigprocmask(SIG_BLOCK, &sset, NULL);
    
/**
 * @brief Introduction
 * To find whether the cknit is running or not
 */
    conf = e_data_from_file(PIDFILE, NULL);
    if (conf)
    {
        errno = 0;
        int pd = ( int )strtol( conf, NULL, 10);
        if ( pd != 0 )
        {
            int re = kill( pd, 0);
            if ( re == 0 || ( re != -1 || errno == EPERM ) )
            {
                fprintf( stderr, "mbinlogmq:[PID: %d] has been started before.\n", pd );
                return;
            }
        }
    }
    e_write_to_file(PIDFILE, "%d", getpid());
/**
 * @brief NOTICE
 * Read the xml configuration file
 */
    res = access(CONF_FILE, F_OK | R_OK);
    if ( res == -1 )
    {
        fprintf(stderr, "Config file:[%s] not exists or permission not right.\n", CONF_FILE );
        return ;
    }
    len  = 0;
    conf = e_data_from_file(CONF_FILE, &len);
    cxml = new_cxml_from_string2(conf, ( unsigned long )len);
    if ( !cxml )
    {
        fprintf(stderr, "%s\n", new_cxml_get_error());
        return ;
    }

/**
 * @brief NOTICE
 * Parse the XML node to the given structure
 */
    CXML_binlog *binlog = NEW_CXML_binlog_FROM_DATA(cxml->data);
    if ( !binlog || binlog->__oth )
    {
        goto terminate_process;
    }
    CXML_system *system = NEW_CXML_system_FROM_DATA(binlog->system->val);
    if ( !system || system->__oth )
    {
        goto terminate_process;
    }
    cxmlRmq = NEW_CXML_rmq_FROM_DATA(binlog->rabbitmq->val);
    if ( !cxmlRmq || cxmlRmq->__oth )
    {
        goto terminate_process;
    }
    
/**
 * @brief NOTICE
 * Read the data from the XML config file.
 * and verify it with the given rule.
 */
    host = system->host->sval->s;
    user = system->user->sval->s;
    password = system->password->sval->s;
    port = (unsigned int)strtol(system->port->sval->s, NULL, 10);
    seid = (unsigned long)strtol(system->server_id->sval->s, NULL, 10);
    
    mysql         = mysql_init(NULL);
    queryInstance = mysql_init(NULL);
    
    if ( !host || !user || !password || !port || !seid )
    {
        goto terminate_process;
    }
/**
 * @brief NOTICE
 * Check the RabbitMQ's config ok
 */
    if ( !cxmlRmq->vhost->sval->l ||
         !cxmlRmq->name->sval->l ||
         !cxmlRmq->routing_key->sval->l||
         !cxmlRmq->delivery_mode->sval->l )
    {
        goto terminate_process;
    }
    
/**
 * @brief NOTICE
 * Worker thread
 */
    pthread_t pth;
    pthread_create(&pth, NULL, rmq_publish, &pth);
    
/**
 * @brief NOTICE
 * Daemon or not.
 */
    if ( strtol(system->daemon->sval->s, NULL, 10) ) {
#if !FC_DEBUG
        if ( fork() > 0) exit(0);
#endif
    }
    
/**
 * @brief NOTICE
 * Send the handshark heartpacket
 */
    mysql = mysql_real_connect(mysql, host, user, password, NULL, port, NULL, 0 );
    queryInstance = mysql_real_connect(queryInstance, host, user, password, NULL, port, NULL, 0 );
    
    if ( mb_register_slave( mysql->net.fd, 0, seid,
        host,     ( unsigned char )system->host->sval->l,
        user,     ( unsigned char )system->user->sval->l,
        password, ( unsigned char )system->password->sval->l,
        port) ) {
        printf("Register Slave successfully!\n");
    }
    
/**
 * @brief NOTICE
 * Send the GLOBAL BINLOGCHECKSUM off
 * and Get the last binlog pos. and filename
 */
    mysql_real_query(mysql, E_STRL("SET GLOBAL BINLOG_CHECKSUM=NONE") );
    mysql_real_query(mysql, E_STRL("SHOW MASTER STATUS"));
    mysqlRes = mysql_store_result(mysql);
    row = mysql_fetch_row(mysqlRes);
    
    if ( mb_binlog_dump( mysql->net.fd,
        ( unsigned long)strtol(row[1], NULL, 10),
        seid,
        row[0], ( unsigned int )strlen( row[0] ) ) ){
        printf("OK for binlog dump\n\n\n\n");
    }
/**
 * @brief NOTICE
 * Begin Parse the MySQL binlog events.
 * TRASH_CXML_rmq(rmq);
 * trash_cxml(cxml);
 */
    TRASH_CXML_binlog(binlog);
    TRASH_CXML_system(system);

    rmqData = new_fcl_list();
    
    mb_parse_binlog(mysql->net.fd);
    
    if ( 0 ) {
terminate_process:
        fprintf(stderr, "Your xml config file not right. Please verify.");
        return ;
    }
    
}
